package com.starzy.mr;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * @Author: starzy https://www.cnblogs.com/starzy
 * @Description: 计算互为好友
 *  计算数据：
 *      A:B,C,D,F,E,O
 *      B:A,C,E,K
 *      C:F,A,D,I
 *      D:A,E,F,L
 *      E:B,C,D,M,L
 *      F:A,B,C,D,E,O,M
 *      G:A,C,D,E,F
 *      H:A,C,D,E,O
 *      I:A,O
 *      J:B,O
 *      K:A,C,D
 *      L:D,E,F
 *      M:E,F,G
 *      O:A,H,I,J,K
 * @Date: Created in 14:03 2020/12/15
 */
public class FansMR {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        // 指定hdfs相关的参数
        Configuration conf = new Configuration();
        //conf.set("fs.defaultFS", "hdfs://hadoop02:9000");
        //System.setProperty("HADOOP_USER_NAME", "hadoop");

        Job job = Job.getInstance(conf);
        // 设置jar包所在路径
        job.setJarByClass(FansMR.class);

        // 指定mapper类和reducer类
        job.setMapperClass(FansMapper.class);
        job.setReducerClass(FansReducer.class);

        // 指定maptask的输出类型
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(Text.class);
        // 指定reducetask的输出类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        // 指定该mapreduce程序数据的输入和输出路径
        Path inputPath = new Path("D:/data/friends/input");
        Path outputPath = new Path("D:/data/friends/output");
        FileSystem fs = FileSystem.get(conf);
        if (fs.exists(outputPath)) {
            fs.delete(outputPath, true);
        }
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // 最后提交任务
        boolean waitForCompletion = job.waitForCompletion(true);
        System.exit(waitForCompletion ? 0 : 1);

    }

    static class FansMapper extends Mapper<LongWritable, Text,Text,Text>{
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] tokens = line.split(":");
            String person = tokens[0];
            String[] fens = tokens[1].split(",");

            /**
             * 写出的数据格式：
             * AB ---> A
             * AB ---> B
             * 如果AB是互粉对，那么必然 AB 所对应的的 values组合会有两个值，也就是必定会有 A 和 B
             */
            for (String fen : fens) {
                context.write(new Text(combineStr(person, fen)), new Text(person));
            }
        }

        /**
         * 此方法的作用就是把 AB 和 BA 的组合 都编程 AB
         */
        private String combineStr(String a, String b) {
            if (a.compareTo(b) > 0) {
                return b +"-"+ a;
            } else {
                return a +"-"+ b;
            }
        }

    }

    static class FansReducer extends Reducer<Text,Text,Text,NullWritable>{
        @Override
        protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (Text t : values) {
                sum++;
            }
            if (sum == 2) {
                context.write(key, NullWritable.get());
            }
        }
    }
}
